package day02.transform;

import beans.SensorReading;
import enums.TemperatureType;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Collections;

/**
 * Flink 流处理 API -  合流：Connect 和 CoMap
 *
 * @author lvbingbing
 * @date 2021-12-13 22:38
 */
public class FlinkTransform05 {
    public static void main(String[] args) throws Exception {
        // 1、创建FlinkTransform00对象，有参构造会初始化 env，并从文件中读取数据
        int parallelism = 1;
        FlinkTransform00 flinkTransform = new FlinkTransform00(parallelism);
        // 2、获取执行环境
        StreamExecutionEnvironment env = flinkTransform.getEnv();
        // 3、学习 connect、coMap、union
        studyConnectCoMapUnion(flinkTransform.getSensorReadingStream());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 学习 connect、coMap、union
     * <p>
     * connect()：DataStream,DataStream → ConnectedStreams：连接两个保持他们类型的数据流，两个数据流
     * 被 Connect 之后，只是被放在了一个同一个流中，内部依然保持各自的数据和形式不发生任何变化，两个流相互独立。
     * <p>
     * CoMap,CoFlatMap：ConnectedStreams → DataStream：作用于 ConnectedStreams 上，功能与 map
     * 和 flatMap 一样，对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。
     * <p>
     * union()：DataStream → DataStream：对两个或者两个以上的 DataStream 进行 union 操作，
     * 产生一个包含所有 DataStream 元素的新 DataStream。
     * <p>
     * connect() 与 union() 区别：<br>
     * 1． Union 之前两个流的类型必须是一样， Connect 可以不一样，在之后的 coMap 中再去调整成为一样的。<br>
     * 2. Connect 只能操作两个流， Union 可以操作多个。
     *
     * @param sensorReadingStream <br>
     */
    private static void studyConnectCoMapUnion(DataStream<SensorReading> sensorReadingStream) {
        // 1、获取分流
        SplitStream<SensorReading> splitStream = sensorReadingStream.split(value -> value.getTemperature().compareTo(30.0) > 0 ?
                Collections.singletonList(TemperatureType.HIGH.type) : Collections.singletonList(TemperatureType.LOW.type));
        // 高温流
        DataStream<SensorReading> highStream = splitStream.select(TemperatureType.HIGH.type);
        // 低温流
        DataStream<SensorReading> lowStream = splitStream.select(TemperatureType.LOW.type);
        // 将高温流转换成二元组类型
        DataStream<Tuple2<String, Double>> highTupleStream = highStream.map(sensorReading -> new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature()))
                .returns(Types.TUPLE(Types.STRING, Types.DOUBLE));
        // 2、合流：connect
        ConnectedStreams<SensorReading, Tuple2<String, Double>> connectedStreams = lowStream.connect(highTupleStream);
        // 3、对 connectedStreams 中的每一个 Stream 分别进行 map转换
        DataStream<Object> coMapStream = connectedStreams.map(new CoMapFunction<SensorReading, Tuple2<String, Double>, Object>() {
            @Override
            public Object map1(SensorReading sensorReading) {
                return new Tuple2<>(sensorReading.getId(), "normal");
            }

            @Override
            public Object map2(Tuple2<String, Double> tuple2) {
                return new Tuple3<>(tuple2.f0, tuple2.f1, "high temperature warning");
            }
        });
        coMapStream.print("coMapStream");

        // 4、union操作多个流
        DataStream<SensorReading> unionStream = highStream.union(lowStream).union(splitStream);
        unionStream.print("unionStream");
    }
}
